/*** Eclipse Class Decompiler plugin, copyright (c) 2012 Chao Chen (cnfree2000@hotmail.com) ***/
package com.sankuai.xm.kafka.client.factory;

import com.sankuai.xm.kafka.client.IProducerProcessor;
import com.sankuai.xm.kafka.client.KafkaClient;
import com.sankuai.xm.kafka.client.exception.ProducerConfigException;
import com.sankuai.xm.kafka.client.utils.StackTraceUtil;
import java.util.Properties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaProducerBuildFactory extends KafkaBuildFactory {
	private static final Logger log = LoggerFactory
			.getLogger(KafkaProducerBuildFactory.class);

	public static IProducerProcessor init() throws Exception {
		Properties properties = KafkaBuildFactory.init0("producer.properties");

		String topic = (String) properties.get("kafka.topic");
		if (topic == null) {
			log.error("xm-kafka-client, kafka.topic is null.");
			throw new ProducerConfigException(
					"kafka Topic is null, can't init kafka client.");
		}

		String partitioner = (String) properties.get("partitioner.class");
		if (partitioner == null) {
			log.error("xm-kafka-client, kafka.partitioner.class is null.");
			throw new ProducerConfigException(
					"kafka Partitioner class is null, can't init kafka client.");
		}

		String brokerlist = (String) properties.get("metadata.broker.list");
		if (brokerlist == null) {
			log.error("xm-kafka-client, metadata.broker.list is null.");
			throw new ProducerConfigException(
					"kafka Broker list is null, can't init kafka client.");
		}

		Properties initProperties = new Properties();
		initProperties.put("kafka.topic", topic);
		initProperties.put("partitioner.class", partitioner);
		initProperties.put("metadata.broker.list", brokerlist);
		initProperties.put("producer.type", "async");

		initProperties.put("request.required.acks", "1");
		initProperties.put("queue.buffering.max.messages", "10000");
		initProperties.put("queue.buffering.max.ms", "1");
		initProperties.put("batch.num.messages", "1000");
		IProducerProcessor producer;
		try {
			producer = KafkaClient.buildProduceFactory(initProperties);
		} catch (Exception e) {
			log.error(
					"xm-kafka-client, Kafka Producer build Exception. exception : {}",
					StackTraceUtil.getStackTrace(e));
			throw new ProducerConfigException(e);
		}
		return producer;
	}

	public static IProducerProcessor init(String topicKey, String brokerKey)
			throws Exception {
		Properties properties = KafkaBuildFactory.init0("producer.properties");

		String topic = (String) properties.get(topicKey);
		if (topic == null) {
			log.error("xm-kafka-client, kafka.topic is null.");
			throw new ProducerConfigException(
					"kafka Topic is null, can't init kafka client.");
		}

		String partitioner = (String) properties.get("partitioner.class");
		if (partitioner == null) {
			log.error("xm-kafka-client, kafka.partitioner.class is null.");
			throw new ProducerConfigException(
					"kafka Partitioner class is null, can't init kafka client.");
		}

		String brokerlist = (String) properties.get(brokerKey);
		if (brokerlist == null) {
			log.error("xm-kafka-client, metadata.broker.list is null.");
			throw new ProducerConfigException(
					"kafka Broker list is null, can't init kafka client.");
		}

		Properties initProperties = new Properties();
		initProperties.put("kafka.topic", topic);
		initProperties.put("partitioner.class", partitioner);
		initProperties.put("metadata.broker.list", brokerlist);
		initProperties.put("producer.type", "async");

		initProperties.put("request.required.acks", "1");
		initProperties.put("queue.buffering.max.messages", "10000");
		initProperties.put("queue.buffering.max.ms", "1");
		initProperties.put("batch.num.messages", "1000");
		IProducerProcessor producer;
		try {
			producer = KafkaClient.buildProduceFactory(initProperties);
		} catch (Exception e) {
			log.error(
					"xm-kafka-client, Kafka Producer build Exception. exception : {}",
					StackTraceUtil.getStackTrace(e));
			throw new ProducerConfigException(e);
		}
		return producer;
	}
}